package xmachinery

import (
	"github.com/RichardKnop/machinery/v2"
	backendsiface "github.com/RichardKnop/machinery/v2/backends/iface"
	brokersiface "github.com/RichardKnop/machinery/v2/brokers/iface"
	"github.com/RichardKnop/machinery/v2/config"
	lockiface "github.com/RichardKnop/machinery/v2/locks/iface"
	"github.com/RichardKnop/machinery/v2/log"
	"github.com/RichardKnop/machinery/v2/tasks"
	"github.com/RichardKnop/machinery/v2/utils"
	"github.com/robfig/cron/v3"
	"sync"
	"time"
	"unsafe"
)

type ServerRef struct {
	config            *config.Config
	registeredTasks   *sync.Map
	broker            brokersiface.Broker
	backend           backendsiface.Backend
	lock              lockiface.Lock
	scheduler         *cron.Cron
	prePublishHandler func(*tasks.Signature)
}

type XServer struct {
	sync.Mutex
	*machinery.Server
	scheduler                *cron.Cron
	machineryLock            lockiface.Lock
	registeredScheduledTasks map[string]*ScheduledTask
}

//通过原生server构建xserver
func NewServer(server *machinery.Server) *XServer {
	mySrv := &XServer{
		Server:                   server,
		registeredScheduledTasks: make(map[string]*ScheduledTask),
	}

	ref := new(ServerRef)
	lock := (*lockiface.Lock)(unsafe.Pointer(uintptr(unsafe.Pointer(server)) + unsafe.Offsetof(ref.lock)))
	mySrv.machineryLock = *lock
	//创建秒级定时器
	mySrv.scheduler = cron.New(cron.WithSeconds())
	mySrv.scheduler.Start()

	return mySrv
}

//注册定时任务
func (server *XServer) RegisterScheduledTask(task *ScheduledTask) error {
	server.Lock()
	defer server.Unlock()
	return server.registerScheduledTask(*task)
}

func (server *XServer) registerScheduledTask(task ScheduledTask) error {
	//检查cron表达式
	schedule, err := secondsParser.Parse(task.Spec)
	if err != nil {
		return err
	}
	//移除旧任务
	server.removeScheduledTask(task.Id)
	//包装任务函数
	f := func() {
		//抢占任务锁
		err := server.machineryLock.LockWithRetries(utils.GetLockName(task.TaskCode, task.Spec), schedule.Next(time.Now()).UnixNano()-1)
		if err != nil {
			return
		}
		//创建任务签名
		signature := task.Signature()
		//发送任务
		_, err = server.SendTask(signature)
		if err != nil {
			log.ERROR.Printf("scheduled task failed. task id is: %s. task name is: %s. error is %s", task.Id, task.TaskCode, err.Error())
		}
	}
	//添加定时任务
	entryId, err := server.scheduler.AddFunc(task.Spec, f)
	newTask := NewScheduledTask(task.Id, task.TaskCode, task.Spec, task.TaskQueue, task.Args...)
	newTask.entryId = entryId
	//存储定时任务信息
	server.registeredScheduledTasks[task.Id] = newTask
	return err
}

//批量注册定时任务
func (server *XServer) RegisterScheduledTasks(tasks []*ScheduledTask) error {
	server.Lock()
	defer server.Unlock()
	for _, task := range tasks {
		err := server.registerScheduledTask(*task)
		if err != nil {
			return err
		}
	}
	return nil
}

//重载所有定时任务
func (server *XServer) ReloadScheduledTasks(tasks []*ScheduledTask) error {
	newTaskMap := make(map[string]*ScheduledTask, len(tasks))
	for _, task := range tasks {
		newTaskMap[task.Id] = task
	}
	server.Lock()
	defer server.Unlock()

	/*
	 * 移除未入参但已注册的
	 */
	var deleteTasks []*ScheduledTask
	for _, oldTask := range server.registeredScheduledTasks {
		if _, ok := newTaskMap[oldTask.Id]; !ok {
			deleteTasks = append(deleteTasks, oldTask)
		}
	}
	for _, task := range deleteTasks {
		server.removeScheduledTask(task.Id)
	}

	/*
	 * 重新注册变更或新增的
	 */
	var reloadTasks []*ScheduledTask
	for _, newTask := range newTaskMap {
		if oldTask, ok := server.registeredScheduledTasks[newTask.Id]; !ok || !newTask.Equal(oldTask) {
			reloadTasks = append(reloadTasks, newTask)
		}
	}
	for _, task := range reloadTasks {
		err := server.registerScheduledTask(*task)
		if err != nil {
			return err
		}
	}
	return nil
}

//移除定时任务
func (server *XServer) RemoveScheduledTask(id string) {
	server.Lock()
	defer server.Unlock()
	server.removeScheduledTask(id)
}

func (server *XServer) removeScheduledTask(id string) {
	if old, ok := server.registeredScheduledTasks[id]; ok {
		server.scheduler.Remove(old.entryId)
		delete(server.registeredScheduledTasks, id)
	}
}

var secondsParser = cron.NewParser(
	cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
